/*-------------------------------------------------------------------------
 *
 * execFragment.h
 *
 * Portions Copyright (c) 2018, Tencent OpenTenBase-C Group.
 * Portions Copyright (c) 2012-2014, TransLattice, Inc.
 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * src/include/executor/execFragment.h
 *
 *-------------------------------------------------------------------------
 */
#ifndef EXECFRAGMENT_H
#define EXECFRAGMENT_H

#include "access/parallel.h"
#include "executor/tqueueThread.h"
#include "forward/fnbuf.h"
#include "forward/fnbuf_internals.h"
#include "pgxc/execRemote.h"
#include "pgxc/locator.h"
#include "pgxc/pgxcnode.h"
#include "nodes/execnodes.h"
#include "nodes/plannodes.h"

#define RootFragmentIndex			0
#define RootParentFragmentIndex		(-1)
#define IsLevelOneFragment(fragment)	\
			((fragment)->level == 1)

#define IsFragmentInSubquery(subplan)	\
			((subplan)->under_subplan)

typedef enum
{
	Fragment_Normal,
	Fragment_Cte,
	Fragment_GtidScan
} FragmentType;

#define IsFragmentGtidScan(fragment) \
		((fragment)->ftype == Fragment_GtidScan)

#define IsFragmentSharedCte(fragment) \
		((fragment)->ftype == Fragment_Cte)

typedef struct RecvBuffer
{
	Tuplesortstate *tuplesortstate;     /* sort state for merge remote */
	TupleQueueReceiver  *queue;         /* queue for receiving buffers */
	bool	transfer_datarow;
	void* controller;
} RecvBuffer;

#define SEND_BUFFER_COORD		0x01	/* this buffer is sent to CN */
#define SEND_BUFFER_BROADCAST	0x02	/* broadcast this buffer to all DN */
#define SEND_BUFFER_DATAROW		0x04	/* transmitting datum in datarow form */
#define SEND_BUFFER_PARAM		0x08	/* transmitting params */

typedef struct SendBuffer
{
	uint16			fid;
	int				targetNodeNum;  /* send node number */
	uint8			virtualdopNum;
	uint8			flag;			/* variant flags, see macros above */
	FnBuffer		*buffers;       /* buffer id array for all target nodes */
	FnSndQueueEntry	*control;       /* controller registered at fn sender node */
} SendBuffer;

typedef struct FragmentInstrumentation
{
	Size fragment_mem;                  /* in kB */
	Size executor_mem;                  /* in kB */
	int total_pages;
	int disk_pages;
} FragmentInstrumentation;

/* ----------------
 *	 Shared memory container for per-worker fragment information
 * ----------------
 */
typedef struct SharedFragmentInfo
{
	int			num_workers;
	FragmentInstrumentation sinstrument[FLEXIBLE_ARRAY_MEMBER];
} SharedFragmentInfo;

typedef struct NodeId
{
	uint16	nodeid;
	uint8	virtualid;
} NodeId;

/*
 * Execution state of a RemoteFragment node
 */
typedef struct RemoteFragmentState
{
	ResponseCombiner combiner;			/* see ResponseCombiner struct */
	bool		sendFragment;			/* is it a send fragment in this DProcess */
	bool		outPlanInit;			/* is its outer plan initialized? */
	bool 		needRecvData;			/* Does it need receive data? */
	bool		initialized;
	bool		need_to_scan_locally;	/* need to read from local plan? (as gather) */
	struct ParallelExecutorInfo *pei;
	shm_toc    	*toc;

	bool		execOnAll;				/* should query be executed on all (true) or any (false) node specified
										 * in the execNodes list */
	bool		broadcast_flag;			/* 0 for not broadcast; 1 for broadcast */
	CombineType combineType;			/* see CombineType */
	Locator    *locator;				/* determine destination of tuples of locally executed plan */
	int 	   *dest_nodes;				/* allocate once */
	List	   *execNodes;				/* where to execute subplan */

	/* For sending fragment */
	Datum	   *disValues;
	bool	   *disIsNulls;
	SendBuffer *sendBuffer;				/* per node send buffer */
	SendBuffer *sendBufferBroadcast;	/* per group send buffer */
	NodeId		nodeid;
	uint8	   *sc_virtualid;

	/* For recv fragment */
	RecvBuffer	*recvBuffer;			/* per-node per-column recv buffer */
	RecvBuffer 	*recvParamBuffer;		/* buffer for recving paramters */
	SendBuffer *sendParamBuffer;		/* buffer for sending paramters */
	bool        sendParam;				/* send param this loop? avoid to send twice */
	bool        sendMsg;				/* send msg this loop? avoid to send twice */

	bool		finish_init;
	MemoryContext 	tmp_cxt;			/* tmp context for send */
	MemoryContext	cxt;				/* memory context for fstate */
	int				total_pages;
	int				disk_pages;
	SharedFragmentInfo	*shared_info;	/* one entry per worker */
	ParallelWorkerStatus *parallel_status; /* Shared storage for parallel worker. */

	/* For Data skew */
	int 		roundrobin_node;		/* Mark the roundrobin node id in the list */
	HTAB* 		retain_hash;			/* Hash table for values to be sent local */
	HTAB* 		broadcast_hash;			/* Hash table for values to be broadcast */

	int 		eflags;
	/*
	 * For UNSORTED vector page only, we must split each sending source into
	 * different tapes since the vector data is HUGE data, and they will not
	 * accumulate in tqthread, and these are used for rand-robin-ly accessing
	 * tapes.
	 */
	int 		num_tape;
	int 		num_finish;

	/*
	 * Generated by multiple "dests" in RemoteSubplan for dispatching data to
	 * multiple destination with different fid.
	 */
	SendBuffer	**extraSendBuffers;

	/* Storage for cache send */
	int			num_table;
	Tuplestorestate **cacheSendTable;
	Bitmapset *recvMsgNodes;
} RemoteFragmentState;

typedef struct Fragment
{
	int			fid; 		/* Unique fragmentId allocated from GTM */
	int 		level;		/* fragment level from root fragment */

	int 		index;		/* index into fragment table */
	int 		parentIndex;/* Parent fragment index in fragment table */
	
	Plan	   *plan;		/* subplan to execute */
	RemoteFragmentState  *pstate;

	PGXCNodeHandle	**connections; /* DProcess nodes to execute subplan */ 
	int 		connNum;	/* connection number */

	FragmentType ftype;		/* fragment type */

	char		cursor[NAMEDATALEN];	/* portal name of fragment */
	
	/* for cursor */
	int 		paramlen;
	int 		nparam_formats;
	char	   *paramdata;
	int16	   *param_formats;
} Fragment;

typedef struct FragmentTable
{
	Fragment  **fragments;	/* Fragment array */


	/* current assigned fragment index in the fragment array */
	int			currentAssignIndex;

	/*
	 * In some rare case when dealing with initplan, we'll hit same plan
	 * multiple times, use a bitmap to avoid reallocating fragment, this
	 * stores plan_node_id.
	 */
	Bitmapset  *allocated;

	MemoryContext ftcxt;
} FragmentTable;

#define RemoteFragmentGetController(fstate) (fstate->combiner.ss.ps.state->es_remote_controller)

extern void ExecInitFragmentTree(PlanState *planstate, int eflags);
extern void ExecEndFragmentTree(EState *estate);
extern void ExecCloseFragmentTree(EState *estate, bool free);

extern RemoteFragmentState *ExecInitRemoteFragment(RemoteSubplan *node, EState *estate, int eflags);
extern void ExecFinishInitRemoteFragment(RemoteFragmentState *fragmentstate);
extern void ExecReScanRemoteFragment(RemoteFragmentState *node);
extern void ExecShutdownRemoteFragment(RemoteFragmentState *node);
extern void ExecEagerFreeRemoteFragment(PlanState *pstate);

extern TupleTableSlot *ExecRemoteFragment(PlanState *pstate);

extern void ExecEndRemoteFragment(RemoteFragmentState *fstate);

extern void ExecDisconnectRemoteFragment(RemoteFragmentState *fstate);

extern void SetLocalFidFlevel(int fid, int flevel);
extern void ResetLocalFidFlevel(void);
extern int GetLocalFid(void);
extern int GetLocalFlevel(void);

extern Size EstimateFstateSpace(void);
extern void SerializeFstate(Size maxsize, char *start_address);
extern void StartParallelWorkerFstate(char *address);

extern void ExecRemoteFragmentEstimate(RemoteFragmentState *node,
									   ParallelContext *pcxt);
extern void ExecRemoteFragmentInitializeDSM(RemoteFragmentState *node,
											ParallelContext *pcxt);
extern void ExecRemoteFragmentInitializeDSMWorker(RemoteFragmentState *node,
											ParallelWorkerContext *pwcxt);
extern void ExecRemoteFragmentAdjustDSM(RemoteFragmentState *state,
										ParallelContext *pcxt);
extern void ExecRemoteFragmentRetrieveInstrumentation(RemoteFragmentState *node);

extern void* InitRemoteController(EState *estate);
extern void RemoteControllerBindListen(RemoteFragmentController *control, Fragment *fragment);

extern bool enable_exec_fragment_print;
extern bool enable_exec_fragment_critical_print;
extern bool force_transfer_datarow;

#define DEBUG_FRAG(A) \
do { \
	if (unlikely(enable_exec_fragment_print)) \
	{ \
		A; \
	} \
} while (0)

#endif							/* EXECFRAGMENT_H  */
